敲开RxJava 的大门

做了这么久的Android,还没有使用到RxJava。感觉是一件很low 的事情。RxJava 怎么学习,也困扰了自己一段时间。这次蹭着比较闲的一段时间,决定通过源码来学习下RxJava(是的,你没有看错,是通过看源码!!!)。

当然这里需要比较熟悉Java,懂得一些设计模式。给 Android 开发者的 RxJava 详解 对RxJava 的原理以及RxJava 是干什么的都讲的比较清晰。

流程

直接撸起袖子看源码了。

Observable#subscribe 流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class HelloWorld {
public static void main(String[] args) {
Observable.create(new ObservableOnSubscribe<String>() {
@Override public void subscribe(ObservableEmitter<String> emitter) throws Exception {
System.out.println("hello");
emitter.onNext("hello");
System.out.println("world");
emitter.onNext("world");
emitter.onComplete();
}
}).subscribe(new Observer<String>() {
@Override public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override public void onNext(String s) {
System.out.println("onNext " + s);
}
@Override public void onError(Throwable e) {
System.out.println("onError " + e);
}
@Override public void onComplete() {
System.out.println("onComplete");
}
});
}
}

第1串代码是一个最简单的例子,Observable#create创建一个Observable(被观察者),然后Observable#subscribe(即被观察者订阅了观察者)。

Observable#create 创建被观察者

1
2
3
4
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

直接执行到了RxJavaPlugins#onAssembly

1
2
3
4
5
6
7
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}

默认onObservableAssembly为null,所以最终直接返回了一个ObservableCreate对象。所以默认的Observable#create相当于是创建了一个具体的Obsevable(ObservableCreate)。

Observer是直接new 的,所以跳过Observer创建逻辑。

ObservableCreate#subscribe 流程

注意方法的参数是Observer

1
2
3
4
5
6
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
observer = RxJavaPlugins.onSubscribe(this, observer); //生成一个新的Observer
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer); //确认订阅关系
}

第3行,生成一个新的Observer,这里默认返回的就是传入的Observer。

第5行,确定两者的订阅关系。上面已经可以知道,当前的Observable 是ObservableCreate

1
2
3
4
5
6
//ObservableCreate.java
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent); //1.回调Observer#onSubscribe
source.subscribe(parent); //2.回调Observable#subscribe
}

至此,执行的顺序有一些明确了。同时这里出现了一个新对象CreateEmitter。注意这里的Observer对象实际是对例子中Observer对象的包装,所以简单可以理解成直接调用的例子中创建的Observer

  1. Observer#onSubscribe,从这里可以看出onSubscribe只会执行一次。
  2. ObservableOnSubscribe#subscribeObservableOnSubscribe对象实际就是例子中new 的。

Observable#subscribe 回调流程

1
2
3
4
5
6
7
@Override public void subscribe(ObservableEmitter<String> emitter) throws Exception {
System.out.println("hello");
emitter.onNext("hello");
System.out.println("world");
emitter.onNext("world");
emitter.onComplete();
}

相当于开发者自身的业务逻辑代码。这里的emitter 前面已经介绍,是一个CreateEmitter

CreateEmitter#onNext 流程

1
2
3
4
5
6
7
8
9
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}

第7行,直接调用Observer#onNext

然后第二次的onNext 以及最后的onComplete 都会按照类似的逻辑进行处理。

至此,Observable 通知Observer 的流程就比较清晰的展示在我们面前了。当然,从这个流程确实看不出RxJava 到底有什么优势。

map 流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
System.out.println("1");
emitter.onNext(1);
System.out.println("2");
emitter.onNext(2);
emitter.onComplete();
}
}).map(new Function<Integer, String>() {
@Override public String apply(Integer integer) throws Exception {
return "hello world " + integer;
}
}).subscribe(new Observer<String>() {
@Override public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override public void onNext(String s) {
System.out.println("onNext " + s);
}
@Override public void onError(Throwable e) {
System.out.println("onError " + e);
}
@Override public void onComplete() {
System.out.println("onComplete");
}
});

第9行,map逻辑似乎是将输入的Integer转化成String输出。

Observable#map 流程

1
2
3
4
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

上面最终默认就是返回了一个ObservableMap。所以现在的情况看来,map的操作实际就是生成一个新的Observable(ObservableMap)。

查看构造函数。

1
2
3
4
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}

保留了原始的Observable(source,这里其实是ObservableCreate),同时增加了一个成员变量function。

ObservableMap#subscribe 流程

直接省略,看最终subscribeActual的逻辑。

1
2
3
4
//ObservableMap.java
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}

这里的source 其实就是一个ObservableCreate。最终将传入的Observer和function 包装成ObservableMap$MapObserver

1
2
3
4
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}

MapObserver的中也会保存两个值,一个原始的Observer(actual,可以理解是原始代码中new 的Observer)以及转换的函数mapper。

之后的逻辑就是第一个流程中的,最后会回调到ObservableMap$MapObserver中。

MapObserver#onNext 流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}

第11行,调用Function#apply获取到转换以后的值(当前的例子是:Integer->String)。
第16行,Observer#onNext调用最新的值。

至此,逻辑也非常清晰,map相当于在onNext 之前对数据重新处理。

flatMap 流程

与map 流程非常类似,所以简单说明。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
final List<String> list = new ArrayList<String>();
for (int i = 0; i < 3; i++) {
list.add("I am value " + integer);
}
return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(TAG + s);
}
});

flatMap 中范型类型与map 中的不一样。返回的是一个ObservableSource。是通过Observable#fromIterable生成的。这里似乎就可以得出一个结论,flatMap 的实现就是将Observable的一个事件转换成一组事件,同时将这一组事件封装成一个Observable。然后最终通知Observer的其实是一组事件。

生成的是一个ObservableFlatMap对象。

subscribeActual重新封装ObserverObservableFlatMap$MergeObserver。根据上面的分析,最终其实调用的是MergeObserver#onNext

MergeObserver#onNext 流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void onNext(T t) {
// safeguard against misbehaving sources
if (done) {
return;
}
ObservableSource<? extends U> p;
try {
p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource"); // 生成一个新的ObservableSource
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
s.dispose();
onError(e);
return;
}
if (maxConcurrency != Integer.MAX_VALUE) {
synchronized (this) {
if (wip == maxConcurrency) {
sources.offer(p);
return;
}
wip++;
}
}
subscribeInner(p); //最终调用逻辑
}

第8行,生成一个新的ObservableSource,具体的apply逻辑,可以由最上面代码获得。

1
2
3
4
5
6
7
8
9
10
new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception { //整体当成一个ObservableSource 了。
final List<String> list = new ArrayList<String>();
for (int i = 0; i < 3; i++) {
list.add("I am value " + integer);
}
return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);
}
}

即最终调用的ObservableObservable#fromIterable,其实就是一个ObservableFromIterable对象。

回到上面的第24行subscribeInner(p);,最终的调用逻辑其实就是ObservableFromIterable#subscribeActual

线程切换逻辑

RxJava 的精髓就是在线程之间的切换。从一个线程的Observable 发送消息,到另外一个线程的Observer 接收消息。

主要的逻辑就是subscribeOnobserveOn,用来指定相应的线程的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Observable.create(new ObservableOnSubscribe<String>() {
@Override public void subscribe(ObservableEmitter<String> emitter) throws Exception {
System.out.println("hello " + Thread.currentThread());
emitter.onNext("hello");
System.out.println("world " + Thread.currentThread());
emitter.onNext("world");
emitter.onComplete();
}
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe(new Observer<String>() {
@Override public void onSubscribe(Disposable d) {
System.out.println("onSubscribe " + Thread.currentThread());
}
@Override public void onNext(String s) {
System.out.println("onNext " + s + " " + Thread.currentThread());
}
@Override public void onError(Throwable e) {
System.out.println("onError " + e);
}
@Override public void onComplete() {
System.out.println("onComplete " + Thread.currentThread());
}
});

上面的例子是假设的是Subscribe 在io 线程,而Observer 在newThread 线程。

这里直接是默认的情况,io 对应的是Schedulers.IO,而newThread 对应的是Schedulers.NEW_THREAD

Observable#subscribeOn(Scheduler) 流程

1
2
3
4
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

实际是就是返回一个新的Observable(ObservableSubscribeOn),即将ObservableCreate包装成ObservableSubscribeOn

Observable#observeOn(Scheduler) 流程

1
2
3
4
5
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

相当于是直接返回了一个新的Observable(ObservableObserveOn)。但是这个ObservableObserveOn实际上是封装了上面的ObservableSubscribeOn

所以最后得到的是一个封装了多层的Observable(ObservableObserveOn-> ObservableSubscribeOn->ObservableCreate->new 的ObservableOnSubscribe)。

Observable#subscribe 流程

这里简化了相关逻辑,其实上面Observable#subscribe有具体的逻辑。

ObservableObserveOn#subscribeActual 流程

1
2
3
4
5
6
7
8
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}

相当于是直接调用ObservableSubscribeOn#subscribeActual

ObservableSubscribeOn#subscribeActual 流程

1
2
3
4
5
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

此时s 代表的是ObservableObserveOn$ObserveOnObserver

第3行,onSubscribe回调的线程是当前线程

重点是第4行。SubscribeTask中封装的是SubscribeTask->SubscribeOnObserver->ObservableObserveOn$ObserveOnObserver->…->最终就是传入的Observer。直接看Scheduler#scheduleDirect逻辑。

Scheduler#scheduleDirect 流程

1
2
3
4
5
6
7
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker(); //创建Worker,注意这里是一个抽象方法
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); //重新封装run,默认不封装
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit); //创建新的DisposeTask,然后去运行
return task;
}

上面已经分析出io 对应的是IoScheduler

1
2
3
4
//IoScheduler.java
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}

直接创建的是一个IoScheduler$EventLoopWorker。所以最终的逻辑就是EventLoopWorker#schedule

EventLoopWorker#schedule 流程

1
2
3
4
5
6
7
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}

只需看最终NewThreadWorker#scheduleActual

1
2
3
4
5
6
7
8
9
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f); //
return sr;
}

至此就知道了Observable#subscribe回调逻辑线程是在Observable#subscribeOn(Scheduler)中指定的。

继续看原始代码。

1
2
3
4
5
6
7
@Override public void subscribe(ObservableEmitter<String> emitter) throws Exception {
System.out.println("hello " + Thread.currentThread());
emitter.onNext("hello");
System.out.println("world " + Thread.currentThread());
emitter.onNext("world");
emitter.onComplete();
}

最终是会回调到ObservableObserveOn$ObserveOnObserver#onNext,即被观察者通知观察者。

1
2
3
4
5
6
7
8
9
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}

默认会执行到第6行,将通知事件加入到queue中。

最后第8行,直接调度事件,同上面的分析。最终可以确定Observer#onNextObserver#onComplete回调逻辑线程是Observable#observeOn(Scheduler)指定的。

至此,对subscribeOnobserveOnObservable运行的线程和对Observer运行的线程有了比较直观的了解。回调的逻辑还是按照上面最简单的逻辑,只是在运行的时候封装成Runnable,给不同的线程池调用。

总结

看了一些最简单的基础教程以后,直接写demo 看源码是我自己觉得不错的开源框架的学习方法。后续就要开始在工程中做一些使用的调研了。

RxJava 教程系列
给 Android 开发者的 RxJava 详解
Java开发必会的反编译知识(附支持对Lambda进行反编译的工具)
What’s different in 2.0
RxJava api